### 查询catalina.out下 "mType":"AAA",出现的次数
[root@iZbp18nyz28va2zwadft2qZ logs]# grep -o '"mType":"AAA",' catalina.out | wc  -l
680
[root@iZbp18nyz28va2zwadft2qZ logs]# 
[root@iZbp18nyz28va2zwadft2qZ logs]# grep -o '"mType":"AAB"' catalina.out | wc  -l
612

数字统计出来了,AAA是下单的次数,AAB是服务器返回给终端的次数,不对等

那继续昨天任务处理类的测试问题

//原来项目里这样创建线程池,当任务超过这个额定量,
//默认拒绝抛异常的策略new ThreadPoolExecutor.AbortPolicy()
private ExecutorService threadPools = Executors.newFixedThreadPool(300);

仔细看了这种创建线程方式的

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

翻译为:

 /** 
   *创建一个重用固定数量线程的线程池*在一个共享的无界队列上操作。在任何时候,
   * {@code nThreads}个线程将是活动的处理任务。 
   *如果在所有线程都处于活动状态时提交了其他任务,则
   *它们将在队列中等待,直到线程可用为止。 
   *如果任何线程由于执行过程中的失败而终止
   *在关机之前,如果需要一个新线程将代替
   *执行后续任务。直到显式{@link ExecutorService#shutdown shutdown}之前,池中的线程将一直存在
   * @param nThreads池中的线程数
   * @返回新创建的线程池
   * @throws IllegalArgumentException如果{@code nThreads <= 0} 
   * /

意为固定可重用的无界队列,并且线程池的线程数大小为new FixedThreadPool(300),并且他这个拒绝策略是不会报异常的那种 CallerRunsPolicy

这里引申出线程池的四种拒绝策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

所以这样创建线程池直接使用提交任务的线程去处理当前任务,导致MQ消息处理的消息丢失,困扰了那么久的问题原来是这个

那就解决它:

线程池的最佳参数设置:参考《Java并发编程实战》

image.png

我自己的服务器是双核4G,所以我核心处理为2,拒绝策略设置为1是因为如果出现了线程池不够用,我可以看到报RejectedExecutionHandler异常来定位

private ThreadPoolExecutor threadPools = new ThreadPoolExecutor(
            2,
            10,
            1,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1024), threadFactory,
            new ThreadPoolExecutor.AbortPolicy()
    );

这种情况下测试没有出现丢消息的问题,但是不排除会出现OOM,后续再测试。

目前的测试数已经能保证平台机器最大并发可用

image.png

线程不能乱用,随意用,不然会挖大坑!!